package tableSink

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.descriptors.{Csv, Kafka, Schema}

object kafkaPiplineTest {

  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    // 2.从kafka读取数据
    tableEnv.connect(new Kafka()
      .version("universal")
      .topic("sensor")
      .startFromEarliest()
      .property("zookeeper.connect", "master:2181")
      .property("bootstrap.servers", "master:9092")
    ).withFormat(new Csv) // 序列化反序列化
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE()))
      .createTemporaryTable("kafkaInputTable")


    // 3.查询转换
    // 3.1 简单转换
    val sensorTable: Table = tableEnv.from("kafkaInputTable")
    val sensor1Table: Table = sensorTable.select('id, 'temperature)
      .filter("id == 'sensor_1'")

    // 3.2 聚合转换
    sensorTable.groupBy('id)
      .select('id as 'idcounts)

    // 4.输出到kafka
    tableEnv.connect(new Kafka()
      .version("universal")
      .topic("sinktest")
      .startFromEarliest()
      .property("zookeeper.connect", "master:2181")
      .property("bootstrap.servers", "master:9092")
    ).withFormat(new Csv) // 序列化反序列化
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("temp", DataTypes.DOUBLE()))
      .createTemporaryTable("kafkaOutputTable")

    sensor1Table.insertInto("kafkaOutputTable")


    env.execute()
  }

}
